Skip to content

Conversation

@zhongyu09
Copy link
Contributor

This PR is the same as #31269 to merge to branch 3.0

What changes were proposed in this pull request?

This PR is the same as #30998, but with a better UT.
In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others.
This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved.

Why are the changes needed?

When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).
The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small.

The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread.

  1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job
  2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage

1 is trigger before 2, so in normal cases, the broadcast job will be submit first.
However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job.

Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Add UT

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented May 5, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 5, 2021
@github-actions github-actions bot closed this May 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants